Skip to content

[WIP] Batch pushes and pops#103

Open
raphael-s-steiner wants to merge 20 commits intomax0x7ba:masterfrom
raphael-s-steiner:master
Open

[WIP] Batch pushes and pops#103
raphael-s-steiner wants to merge 20 commits intomax0x7ba:masterfrom
raphael-s-steiner:master

Conversation

@raphael-s-steiner
Copy link
Copy Markdown

This pull request adds batch pushes and pops using iterator semantics to alleviate pressure on the atomic heads and tails of the queue.

In particular, it adds the following functions with the following signature:

  1. ATOMIC_QUEUE_INLINE InputIt try_push(InputIt first, InputIt const last) noexcept
  2. ATOMIC_QUEUE_INLINE int try_pop(OutputIt& first, int n) noexcept
  3. ATOMIC_QUEUE_INLINE InputIt push(InputIt first, InputIt const last) noexcept
  4. ATOMIC_QUEUE_INLINE OutputIt pop(OutputIt first, unsigned n) noexcept

Some details:

  1. The return iterator is one past the last pushed element to the queue, which of course can be different from last when the queue is empty.
  2. The return integer is the number of successful pop which can be small than the desired number of pops n. The iterator is taken by reference. Its value after the function is one past the last successful pop of the queue. This is such that it may be used to more effectively implement the RetryDecorator and other use cases where both the number of pops and the iterator is required. Note that returning the iterator is not sufficient for iterator mimics, such as std::back_inserter.
  3. The return iterator is one past the last pushed element to the queue.
  4. The return iterator is one past the last popped element from the queue.

Note that int in 2. and unsigned in 4. are purposely chosen such that the implementation uses fewer conversions and is more efficient.

@max0x7ba
Copy link
Copy Markdown
Owner

Wow, Raphael, this is a massive contribution with no precedents in the past.

I am most thankful, of course, but the feeling of being most impressed overwhelms me.

«You will know them by their fruit» and your fruit looks like a product of the most delicate labour of love to me 🤷‍♂️💯.

Comment thread src/tests.cc Outdated
@raphael-s-steiner
Copy link
Copy Markdown
Author

Wow, Raphael, this is a massive contribution with no precedents in the past.

I am most thankful, of course, but the feeling of being most impressed overwhelms me.

«You will know them by their fruit» and your fruit looks like a product of the most delicate labour of love to me 🤷‍♂️💯.

Thank you for your kind comments and for such a great library - Truly one of the greatest and most thoroughly engineered multi-producer multi-consumer queues out there.

@max0x7ba
Copy link
Copy Markdown
Owner

The unit-tests fail because of taking too much time to execute:

2/2 tests   TIMEOUT        30.04s   killed by signal 15 SIGTERM

It looks like the tests gets stuck in:

src/tests.cc(103): Entering test case "stress_batch<atomic_queue__CapacityArgAdaptor<atomic_queue__AtomicQueueB<unsigned int_ std__allocator<unsigned int>_ 0u_ true_ false_ true>_ 4096ul>>"

Copy link
Copy Markdown
Owner

@max0x7ba max0x7ba left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is probably the missing checks for sizes in push and pop what cause the unit-test to deadlock.

unsigned head;
if(Derived::spsc_) {
head = head_.load(X);
head_.store(head + n, X);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n can be greater than the buffer size of the number of free slots in the queue. These conditions must be checked.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There does seem to be an issue here, but it is not so trivial to pinpoint the exact data race with a loop around of the buffer that triggers it.

I will need some time to figure this one out.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was an issue in the tests. The stopping condition was not correctly implemented when CONSUMERS * BATCH_SIZE > CAPACITY. This is now fixed.

In general, the issue you are describing can occur already when CONSUMERS * BATCH_SIZE > CAPACITY, meaning this can already happen with a lot of consumer threads even with single optimist push'es and pop's.

The good news this is not a problem for deadlocking (when properly dealing with the (batched) optimist queue). I shall sketch why this is the case later. Tests have been added to cover this case. The bad news is that when CONSUMERS * BATCH_SIZE > CAPACITY, push'es and pops can happen out of order. For example, two producers can be allocated slots head_1 and head_2 with head_1 < head_2 and head_1 % CAPACITY == head_2 % CAPACITY and it can happen now that the producer with allocated slot head_2 push'es first to the slot and the producer with allocated slot head_1 has to wait. This is an issue in so far as the queue no longer acts as "FIFO". The sketch as to why this does not deadlock is that one can imagine that the two producers swap "roles" when this happens, i.e. you swap the data that would be pushed to head_1 and head_2 and now just pretend that it was the first producer corresponding to head_1 that did the push and the second producer is the one waiting. (I need to write this down better)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case BATCH_SIZE > CAPACITY is not an issue, it just means the producer has to wait until consumers pop'ed enough for the producer to continue push'ing.

Copy link
Copy Markdown
Owner

@max0x7ba max0x7ba Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, the issue you are describing can occur already when CONSUMERS * BATCH_SIZE > CAPACITY, meaning this can already happen with a lot of consumer threads even with single optimist push'es and pop's. The good news this is not a problem for deadlocking (when properly dealing with the (batched) optimist queue). I shall sketch why this is the case later. Tests have been added to cover this case.

README mentions this queue-full condition. Queue code-paths intend to handle it with no extra costs. The unit-test tests the queue-full condition by pushing orders of magnitude more messages than the queue capacity.

The bad news is that when CONSUMERS * BATCH_SIZE > CAPACITY, push'es and pops can happen out of order. For example, two producers can be allocated slots head_1 and head_2 with head_1 < head_2 and head_1 % CAPACITY == head_2 % CAPACITY and it can happen now that the producer with allocated slot head_2 push'es first to the slot and the producer with allocated slot head_1 has to wait.

For a slot, pop#0 waits for push#0 to complete. When wrapped head_ reaches unconsumed tail_, push#1 has to wait for pop#0 to complete. An unconsumed slot blocks any subsequent push into the slot.

An extreme case is a queue with 1 slot used by >=2 producers and >=2 consumers, all doing only optimistic push/pop. Which makes all producers to compete in push and all consumers to compete in pop, for one slot.

This is an issue in so far as the queue no longer acts as "FIFO". The sketch as to why this does not deadlock is that one can imagine that the two producers swap "roles" when this happens, i.e. you swap the data that would be pushed to head_1 and head_2 and now just pretend that it was the first producer corresponding to head_1 that did the push and the second producer is the one waiting. (I need to write this down better)

The current zero-cost solution for this scenario is queue capacity >= max(n_consumers, n_producers), in which case only 1 producer can ever get blocked on an unconsumed slot. But there are no unit-tests or asserts making sure that capacity >= max(n_consumers, n_producers) because the queues don't require the actual number of its producers/consumers to be specified in order to be declared or constructed.

A queue member function can be added to perform this check, if/when the caller has n_consumers, n_producers numbers available.

The queues are designed for low latency, which requires the user to ensure that condition capacity >= max(n_consumers, n_producers) holds true. FIFO order / fairness property emerges at zero-cost when there are no multiple competing producers or consumers for a slot. That's quite neat, but little else than serendipity -- unplanned greatness, rather than the objective. Maintaining FIFO order has non-zero cost, in general, because it requires delaying progress of competing threads. Which conflicts with the low latency objective, incompatible with delaying anything.

Hitting queue-full condition, though, defeats the objective of using these queues in the first place. This condition means the queue is not drained fast enough. Full queues guarantee only the worst latencies.

Guaranteeing FIFO order requires disabling thread preemption while in user-space critical sections, to prevent descheduling threads before they complete and unlock critical sections. Without being able to disable thread preemption for user-space critical sections, FIFO order / fairness is not possible in principle, and not worth burning a single CPU cycle for.


The kernel blocks and queues mutex/futex waiter threads as they arrive and unlocks them in the original FIFO order of blocking. std::condition_variable::wait guarantees FIFO order only when its associated std::mutex is locked prior to calling std::condition_variable::wait and unlocked only after calling std::condition_variable::notify*. These are original POSIX Threads API requirements/guarantees, adopted by C++ verbatim, with C++ multi-threading APIs being little else than thin wrappers for POSIX Threads API. (I abandoned using Windows for Linux in 2003, and wouldn't be able to comment on anything else than Linux.)

Sun OS exposed functions to disable interrupts/preemption of threads from user-space and one could call those directly. Sun's C standard library implementation disabled interrupts/preemption in pthread_mutex_lock/pthread_spin_lock and re-enabled them in pthread_mutex_unlock/pthread_spin_unlock, for example, to prevent preempting threads in the middle of critical sections after locking a mutex but before unlocking it. And that was the most desirable OS behaviour.

The Linux kernel uses all these capabilities preventing de-scheduling threads while holding spin-locks and futexes in its kernel code. But doesn't expose any of these to user-space code because users would only ever deadlock themselves in their spin-locks with interrupts disabled, and that's like giving matches to toddlers.

unsigned tail;
if(Derived::spsc_) {
tail = tail_.load(X);
tail_.store(tail + n, X);
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue here.

@raphael-s-steiner
Copy link
Copy Markdown
Author

raphael-s-steiner commented Apr 21, 2026

The unit-tests fail because of taking too much time to execute:

2/2 tests   TIMEOUT        30.04s   killed by signal 15 SIGTERM

It looks like the tests gets stuck in:

src/tests.cc(103): Entering test case "stress_batch<atomic_queue__CapacityArgAdaptor<atomic_queue__AtomicQueueB<unsigned int_ std__allocator<unsigned int>_ 0u_ true_ false_ true>_ 4096ul>>"

The batch sizes in the tests are too small relative to the capacity to trigger the mentioned issue (to be addressed). The more likely culprit here is that the tests do take longer with the sanitizers. On my machine, they run in a little more than 60s. The additional tests in the current PR increase the test time by 5x and the pre-existing test does take around 12-13s.

@max0x7ba
Copy link
Copy Markdown
Owner

May be do shorter tests with sanitisers?

Building with sanitizers defines extra macros that can be used to adjust the number of test iterations.

May be do random batch sizes.

@max0x7ba
Copy link
Copy Markdown
Owner

max0x7ba commented Apr 21, 2026

Thinking more about iterators,

It is conceivable that the caller of push knows the exact size of the iterator range,

Yet the iterators may not necessarily be of random-access category. std::distance complexity is O(1) for random-access iterators only and O(n) for anything else.

Calling std::distance has non-zero cost, in general. We must not call std::distance. Let the caller supply the length of the iterator range, it may have the length already available.

Zero-cost batch interface, is:

push(input_iterator begin, unsigned size);

It also enables passing in any kind of iterator, including single-pass input iterators, which are often generator objects, producing the next value in its overloaded operator*(). The latter can also be handy for unit-testing.

@max0x7ba
Copy link
Copy Markdown
Owner

The batch sizes in the tests are too small relative to the capacity to trigger the mentioned issue

GitHub actions hosts may be using cheapest shared CPUs, threads get little CPU time. Consumer threads may get delayed and and the queues can easily get full in the unit-tests.

@raphael-s-steiner raphael-s-steiner changed the title Batch pushes and pops [WIP] Batch pushes and pops Apr 23, 2026
@raphael-s-steiner raphael-s-steiner marked this pull request as draft April 23, 2026 05:10
@raphael-s-steiner
Copy link
Copy Markdown
Author

I extended the test cases to cover large BATCH_SIZE including ones that are > CAPACITY. The batch size is now random every time. The meson tests should now run in about 20-22 seconds.

I converted the PR back to draft to do the following:

  1. Add iterator batch push with supplied length
  2. Add better documentation to what happens with large batch sizes and why this does not deadlock (besides knowing that in the optimist queue push'es and pop's are "borrowed" when the queue is full, respectively empty, and this debt must be paid to not deadlock). (For sketch, see an earlier comment.)
  3. Add documentation of the interaction of the remapping of the index (to avoid false sharing) and batch size.

@max0x7ba max0x7ba marked this pull request as ready for review April 24, 2026 05:27
@max0x7ba
Copy link
Copy Markdown
Owner

I extended the test cases to cover large BATCH_SIZE including ones that are > CAPACITY. The batch size is now random every time. The meson tests should now run in about 20-22 seconds.

I converted the PR back to draft to do the following:

  1. Add iterator batch push with supplied length
  2. Add better documentation to what happens with large batch sizes and why this does not deadlock (besides knowing that in the optimist queue push'es and pop's are "borrowed" when the queue is full, respectively empty, and this debt must be paid to not deadlock). (For sketch, see an earlier comment.)
  3. Add documentation of the interaction of the remapping of the index (to avoid false sharing) and batch size.

Thank you for making the requested changes, Raphael, much appreciated.

I navigated here to merge your PR, but the PR diff shows only your original changes for atomic_queue.h without any updates you mention.

That could be related to [raphael-s-steiner] marked this pull request as draft [yesterday], because "mark as draft" warns "People who are already subscribed will not be unsubscribed.", which probably means that "mark as draft" creates another divergent copy elsewhere. I can be wrong as this is the first time I encounter such an issue.

@max0x7ba
Copy link
Copy Markdown
Owner

max0x7ba commented Apr 24, 2026

I navigated here to merge your PR, but the PR diff shows only your original changes for atomic_queue.h without any updates you mention.

Just pulled your branch into my workstation to verify. And that's what I find:

10 matches in 9 lines for "\(try_\| \)\(push\|pop\)" in buffer: atomic_queue.h
    431:    ATOMIC_QUEUE_INLINE bool try_push(T&& element) noexcept {
    450:    ATOMIC_QUEUE_INLINE InputIt try_push(InputIt first, InputIt const last) noexcept {
    477:    ATOMIC_QUEUE_INLINE bool try_pop(T& element) noexcept {
    496:    ATOMIC_QUEUE_INLINE int try_pop(OutputIt& first, int n) noexcept {
    523:    ATOMIC_QUEUE_INLINE void push(T&& element) noexcept {
    537:    ATOMIC_QUEUE_INLINE InputIt push(InputIt first, InputIt const last) noexcept {
    554:    ATOMIC_QUEUE_INLINE auto pop() noexcept {
    568:    ATOMIC_QUEUE_INLINE OutputIt pop(OutputIt first, unsigned n) noexcept {
    593:        // tail_ can be greater than head_ because of consumers doing pop, rather that try_pop, when the queue is empty.

push and try_push accept a pair of iterators, unlike pop and try_pop.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants